你的 Flink 数据重分区又设置错了?Flink 重分区算子详细解析
RebalancePartitioner RescalePartitioner KeyGroupStreamPartitioner GlobalPartitioner ShufflePartitioner ForwardPartitioner CustomPartitionerWrapper BroadcastPartitioner
1. 概览图
2. RebalancePartitioner
Partitioner that distributes the data equally by cycling through the output channels.
private int nextChannelToSendTo;
// 下游channel选择器,第一个数据是随机选择下游其中一个channel
public void setup(int numberOfChannels) {
nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
// 后续+1取模的方式开始轮询下发
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
return nextChannelToSendTo;
// 分发模式为 ALL_TO_ALL
public boolean isPointwise() { return false; }
FLink 将任务的执行计划分为 StreamGraph–>JobGraph–>ExecutionGraph,其中的StreamingJobGraphGenerator类用以实现将StreamGraph转化为JobGraph,在该类中会调用分区器的isPointwise()方法实现分发模式的选择 :POINTWISE / ALL_TO_ALL。
JobEdge jobEdge;
if (partitioner.isPointwise()) {
jobEdge =
headVertex, DistributionPattern.POINTWISE, resultPartitionType);
} else {
jobEdge =
headVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType);
3. RescalePartitioner
The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then one upstream operation would distribute elements to two downstream operations while the other upstream operation would distribute to the other two downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 4 then two upstream operations will distribute to one downstream operation while the other two upstream operations will distribute to the other downstream operations. In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations.
区别于rebalance有两点,轮询从下游第一个分区开始以及是点对点分发模式。 rescale可以增加数据本地处理,减少了网络io性能更高,但数据均衡性不如rebalance。
private int nextChannelToSendTo = -1;
// 下游channel选择器,从0开始轮询
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
if (++nextChannelToSendTo >= numberOfChannels) {
nextChannelToSendTo = 0;
return nextChannelToSendTo;
// 分发模式 POINTWISE 点到点,一个下游只会有一个输入
public boolean isPointwise() { return true; }
4. GlobalPartitioner
Partitioner that sends all elements to the downstream operator with subtask ID=0.
// 下游channel选择器,均返回0
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { return 0;}
// 分发模式为 ALL_TO_ALL
public boolean isPointwise() { return false;}
5. ForwardPartitioner
Partitioner that forwards elements only to the locally running downstream operation.
// 下游channel选择器,均返回0
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { return 0;}
// 分发模式 POINTWISE 点到点,一个下游只会有一个输入
public boolean isPointwise() { return true;}
6. BroadcastPartitioner
Partitioner that selects all the output channels.
上游数据会分发给下游所有分区,故源码里面也提示了不支持select channel。
* Note: Broadcast mode could be handled directly for all the output channels in record writer,
* so it is no need to select channels via this method.
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
throw new UnsupportedOperationException(
"Broadcast partitioner does not support select channels.");
7. KeyGroupStreamPartitioner
Partitioner selects the target channel based on the key group index.
总结下来就是,按照分区键根据hashCode()一次哈希,再murmurHash(keyHash)二次哈希,按照最大并行度(默认128)取模生成keyGroupId,最后根据keyGroupId * parallelism / maxParallelism 得出下游分区index,作为数据分发的依据。
// 核心逻辑,其中最大并行度由系统定义,DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7 为128
public KeyedStream(
DataStream<T> dataStream,
KeySelector<T, KEY> keySelector,
TypeInformation<KEY> keyType) {
new PartitionTransformation<>(
new KeyGroupStreamPartitioner<>(
// key为分组键,maxParallelism由系统定义默认128,numberOfChannels为用户定义并行度
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
K key;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException(
"Could not extract key from " + record.getInstance().getValue(), e);
return KeyGroupRangeAssignment.assignKeyToParallelOperator(
key, maxParallelism, numberOfChannels);
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
Preconditions.checkNotNull(key, "Assigned key must not be null!");
return computeOperatorIndexForKeyGroup(
maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
// 第一次hash
public static int assignToKeyGroup(Object key, int maxParallelism) {
Preconditions.checkNotNull(key, "Assigned key must not be null!");
return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
// 第二次hash(murmurhash)
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
return MathUtils.murmurHash(keyHash) % maxParallelism;
// 根据公式获取目标下游分区index
public static int computeOperatorIndexForKeyGroup(
int maxParallelism, int parallelism, int keyGroupId) {
return keyGroupId * parallelism / maxParallelism;
8. ShufflePartitioner
Partitioner that distributes the data equally by selecting one output channel randomly.
// 随机分发
private Random random = new Random();
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { return random.nextInt(numberOfChannels);}
public boolean isPointwise() { return false;}
9. CustomPartitionerWrapper
Partitions a DataStream on the key returned by the selector, using a custom partitioner. This method takes the key selector to get the key to partition on, and a partitioner that accepts the key type.
public <K> DataStream<T> partitionCustom(
Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
return setConnectionType(
new CustomPartitionerWrapper<>(clean(partitioner), clean(keySelector)));
- EOF -